# export PYSPARK_PYTHON=/usr/bin/python3
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkContext
sc = SparkContext()
sqlContext = SQLContext(sc)
dataPath = '../Police_Department_Incident_Reports__Historical_2003_to_May_2018.csv'
crimeDataSchema = StructType([StructField("IncidntNum", LongType(), True),
StructField("Category", StringType(), True),
StructField("Descript", StringType(), True),
StructField("DayOfWeek", StringType(), True),
StructField("Date", StringType(), True),
StructField("Time", StringType(), True),
StructField("PdDistrict", StringType(), True),
StructField("Resolution", StringType(), True),
StructField("Address", StringType(), True),
StructField("X", DoubleType(), True),
StructField("Y", DoubleType(), True),
StructField("Location", StringType(), True),
StructField("PdId", LongType(), True)])
crimeDF = (sqlContext.read
.format('csv')
.option('delimiter', ',')
.option('header', 'true')
.load(dataPath, schema=crimeDataSchema)
.cache())
# crimeDF.take(1)
Let's first understand what types of crimes there are, and the frequencies of each.
crime_types = crimeDF.groupBy('Category').count()
category_rows = crime_types.select('Category', 'Count').orderBy('Count', ascending=True).collect()
category_counts = [(row.Category, row.Count) for row in category_rows]
# print(category_counts)
import matplotlib.pyplot as plt
import numpy as np
cats = [r[0] for r in category_counts]
vals = [r[1] for r in category_counts]
plt.figure(figsize=(10,15))
plt.barh(np.arange(len(cats)), vals, align='center')
plt.yticks(np.arange(len(cats)), cats)
plt.title('Crime Counts')
plt.xlabel('Counts')
for i, val in enumerate(vals):
plt.text(val+1000, i-0.1, str(val))
plt.show()
crimes = (crimeDF.groupBy('Category')
.count()
.select('Category')
.orderBy('Count', ascending=False)
.collect())
crimes = [row.Category for row in crimes]
print("Crimes: ", crimes)
districts = (crimeDF.groupBy('PdDistrict')
.count()
.select('PdDistrict')
.collect())
districts = [row.PdDistrict for row in districts if row.PdDistrict is not None]
print("Districts: ", districts)
category_district_count = (crimeDF.groupBy('Category', 'PdDistrict')
.count()
.select('Category', 'PdDistrict', 'Count')
.collect())
category_district_count = [(r.Category, r.PdDistrict, r.Count) for r in category_district_count]
# print(category_district_count)
NB: "TREA" is defined as "Trespassing or loitering near posted industrial property"
# Create mapping from text to num for 2d heatmap
crime_index = {crime: index for (index, crime) in enumerate(crimes)}
# print(crime_index)
district_index = {district: index for (index, district) in enumerate(districts)}
# print(district_index)
heatmap_grid = np.zeros([len(crimes), len(districts)])
for (crime, dist, count) in category_district_count:
if not dist is None:
heatmap_grid[crime_index[crime]][district_index[dist]] = count
# print(heatmap_grid)
import matplotlib.patheffects as PathEffects
fig, ax = plt.subplots(figsize=(30, 50))
im = ax.imshow(heatmap_grid, cmap='hot')
ax.set_xticks(np.arange(len(districts)))
ax.set_yticks(np.arange(len(crimes)))
ax.set_xticklabels(districts)
ax.set_yticklabels(crimes)
ax.xaxis.tick_top()
plt.setp(ax.get_xticklabels(), ha="center")
for i in range(len(crimes)):
for j in range(len(districts)):
text = ax.text(j, i, heatmap_grid[i, j],
ha="center", va="center", color="w")
text.set_path_effects([PathEffects.withStroke(linewidth=5, foreground='black')])
ax.set_title("Crimes comitted per district")
fig.tight_layout()
plt.show()
# First cache a DF with an actual date object
from pyspark.sql.functions import udf
import datetime
def parseDate(dateStr):
tokens = dateStr.split("/")
month = int(tokens[0])
date = int(tokens[1])
year = int(tokens[2])
return datetime.date(year, month, date)
date_udf = udf(parseDate, DateType())
crime_with_date = (crimeDF.withColumn("Date_tmp", date_udf(crimeDF.Date))
.drop("Date")
.withColumnRenamed("Date_tmp", "Date")
.select("Category", "Date", "PdDistrict")
.cache())
crime_with_date.printSchema()
crime_with_date.take(5)
This section will analyze the rise and fall in popularity of various crimes over 15 years.
extract_year_udf = udf(lambda datetime: datetime.year, LongType())
crime_with_year = (crime_with_date
.withColumn("Year", extract_year_udf(crime_with_date.Date))
.groupBy("Year", "Category", "PdDistrict")
.count()
.select("Year", "Category", "PdDistrict", "Count")
.orderBy("Year", ascending=True)
.cache())
crime_with_year.take(10)
# print(len(crime_with_year.collect()))
from matplotlib.ticker import MaxNLocator
x = range(2003, 2019) #16 years of data, [2003, 2019)
fig = plt.figure(figsize=(15, 100))
for i, crime in enumerate(crimes):
y = []
for district in districts:
district_crime_counts = [0]*(2019-2003)
district_crime_count_by_year = (crime_with_year
.filter(crime_with_year.PdDistrict == district)
.filter(crime_with_year.Category == crime)
.collect())
for year_row in district_crime_count_by_year:
district_crime_counts[year_row.Year-2003] = year_row.Count
y.append(district_crime_counts)
fig.add_subplot(20,2,i+1)
plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))
plt.stackplot(x, y, labels=districts)
plt.legend(loc='upper left')
plt.title(crime)
plt.ylabel("Counts")
plt.show()
# print(y)
NB: Data for 2018 is not complete, hence the downward trend for all crimes in 2018
This section will analyze how the month of the year impacts the frequency of different criminal activities.
extract_month_udf = udf(lambda datetime: datetime.month, LongType())
crime_with_month_year = (crime_with_date
.withColumn("Year", extract_year_udf(crime_with_date.Date))
.withColumn("Month", extract_month_udf(crime_with_date.Date))
.groupBy("Year", "Month", "Category")
.count()
.select("Year", "Month", "Category", "Count")
.orderBy("Month", ascending=True)
.cache())
crime_with_month_year.take(10)
# print(len(crime_with_year.collect()))
year_month_stats = (crime_with_month_year.rdd
.map(lambda x: ((x.Year, x.Month, x.Category), x.Count))
.reduceByKey(lambda a, b: a[1]+b[1]))
year_month_stats = year_month_stats.collect()
# print(year_month_stats)
# <---2003 ----- 2018--->
# ^
# | jan
# | ...
# | dec
# v
# z = [
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]
crime_radial_plot = {crime:np.zeros([12, 16]) for crime in crimes}
for ((year, month, crime), count) in year_month_stats:
crime_radial_plot[crime][month-1][year-2003] += count
for crime in crime_radial_plot:
theta, r = np.mgrid[0:2*np.pi:13j, 0:1:17j]
z = crime_radial_plot[crime]
fig, ax = plt.subplots(figsize=(7, 7), subplot_kw=dict(projection='polar'))
im = ax.pcolormesh(theta, r, z, cmap='YlGn')
fig.colorbar(im, ax=ax)
ax.set_title(crime)
ax.set_theta_offset(np.pi/2-np.pi/12)
ax.set_theta_direction(-1)
ax.set_yticks(np.arange(0, 1, 1/17))
ax.set_yticklabels([str(y) for y in range(2003, 2019)])
ax.set_xticklabels(['', 'Feb', '', 'May', '', 'Aug', '', 'Nov'])
ax.set_rlabel_position(-4.5*np.pi)
Each slice of the pie represents a month. Each ring represents a year. Intensity of color represents frequency of the occurrence of the crime.
day_idx = {"Monday":0,
"Tuesday":1,
"Wednesday":2,
"Thursday":3,
"Friday":4,
"Saturday":5,
"Sunday":6
}
crime_with_day = (crimeDF
.groupBy("DayOfWeek", "Category", "PdDistrict")
.count()
.select("DayOfWeek", "Category", "PdDistrict", "Count")
.orderBy("DayOfWeek", ascending=True)
.cache())
x = range(0, 7)
fig = plt.figure(figsize=(15, 100))
for i, crime in enumerate(crimes):
y = []
for district in districts:
district_crime_counts = [0]*(7-0)
district_crime_count_by_day = (crime_with_day
.filter(crime_with_day.PdDistrict == district)
.filter(crime_with_day.Category == crime)
.collect())
for day_row in district_crime_count_by_day:
district_crime_counts[day_idx[day_row.DayOfWeek]] = day_row.Count
y.append(district_crime_counts)
fig.add_subplot(20,2,i+1)
plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))
plt.xticks(np.arange(7), day_idx)
plt.stackplot(x, y, labels=districts)
plt.legend(loc='upper left')
plt.title(crime)
plt.ylabel("Counts")
plt.xlabel("day")
plt.show()
def parseTime(timeStr):
tokens = timeStr.split(":")
hour = int(tokens[0])
minute = int(tokens[1])
return hour
timeDF = (crimeDF.withColumn("Time_tmp", udf(parseTime, IntegerType())(crimeDF.Time))
.drop("Time")
.withColumnRenamed("Time_tmp", "Time")
.select("Category", "Time", "PdDistrict")
.cache())
crime_with_hour = (timeDF
.groupBy("Time", "Category", "PdDistrict")
.count()
.select("Time", "Category", "PdDistrict", "Count")
.orderBy("Time", ascending=True)
.cache())
x = range(0, 24) #16 years of data, [2003, 2019)
fig = plt.figure(figsize=(15, 100))
for i, crime in enumerate(crimes):
y = []
for district in districts:
district_crime_counts = [0]*(24-0)
district_crime_count_by_hour = (crime_with_hour
.filter(crime_with_hour.PdDistrict == district)
.filter(crime_with_hour.Category == crime)
.collect())
#print(district_crime_count_by_hour)
for hour_row in district_crime_count_by_hour:
district_crime_counts[hour_row.Time] = hour_row.Count
y.append(district_crime_counts)
fig.add_subplot(20,2,i+1)
plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))
plt.stackplot(x, y, labels=districts)
plt.legend(loc='upper left')
plt.title(crime)
plt.ylabel("Counts")
plt.xlabel("Time since 00:00 AM")
plt.show()